Billy McMonagle
01/15/2021, 8:42 PMjeff n
01/15/2021, 11:16 PMprojects
with one set of runs and accounts
with another set of runs but both of the runs are the same flow code and need to run separately.BK Lau
01/16/2021, 12:31 AMRiley Hun
01/16/2021, 12:40 AMPermissionError
(see reply for full stacktrace of error). Tried to look through the thread to see if others have encountered a similar error, but no luck. Any insight on this?Sonny
01/16/2021, 1:44 AMHui Zheng
01/16/2021, 3:35 AMAmanda Wee
01/16/2021, 11:56 AMidempotency_key
matches the one from the previous flow registration, the flow version is not bumped. However, does the serialised flow get uploaded to storage anyway? I'm too newbie at the codebase (and graphql in particular) to make sense of the details of the flow registration code.Tadas
01/16/2021, 3:51 PMMarwan Sarieddine
01/16/2021, 4:55 PMjack
01/18/2021, 2:30 AMAiden Price
01/18/2021, 7:16 AM[2021-01-17 00:00:11,063] ERROR - Prefect-Kubed | Error while managing existing k8s jobs
Traceback (most recent call last):
File "/usr/local/.venv/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py", line 362, in heartbeat
self.manage_jobs()
File "/usr/local/.venv/lib/python3.8/site-packages/prefect/agent/kubernetes/agent.py", line 219, in manage_jobs
event.last_timestamp
TypeError: '<' not supported between instances of 'NoneType' and 'datetime.datetime'
Sven Teresniak
01/18/2021, 9:21 AMValueError: Multiple flows cannot be used with the same resource block
.
In which direction I have to search for a solution? We heavily rely on a ResourceManager available for multiple flows…Adam Roderick
01/18/2021, 12:42 PMMatic Lubej
01/18/2021, 1:46 PMMatic Lubej
01/18/2021, 2:05 PMMatic Lubej
01/18/2021, 2:06 PMKieran
01/18/2021, 2:40 PMdefault_client = docker.from_env()
FLOW_NAME = "hello-flow"
flow_schedule = CronSchedule("0 8 * * *")
flow_storage = Docker(
base_url=default_client.api.base_url,
tls_config=docker.TLSConfig(default_client.api.cert),
registry_url="<http://_________.dkr.ecr.eu-west-2.amazonaws.com/xxxxx/prefect|_________.dkr.ecr.eu-west-2.amazonaws.com/xxxxx/prefect>"
)
flow_run_config = ECSRun(
cpu="512",
memory="512",
run_task_kwargs={"requiresCompatibilities": ["FARGATE"], "compatibilities": ["FARGATE"]}
)
with Flow(
name=FLOW_NAME,
schedule=flow_schedule,
storage=flow_storage,
run_config=flow_run_config
) as flow:
say_hello()
if is_serializable(flow):
flow.register(project_name="Test", registry_url=flow_storage)
else:
raise TypeError("Flow did not serialise.")
We are getting the following error from our task logs:
An error occurred (InvalidParameterException) when calling the RunTask operation: Task definition does not support launch_type FARGATE.
In an attempt to resolve this issue I tried adding run_task_kwargs
as above but with no luck.
Does anyone have any pointers?
(I can see from the ECS task definition panel that the Compatibilities
is set to EC2 and Requires compatibilities
is blank and from this thread that could be the cause...)Jeff Williams
01/18/2021, 9:51 PMSai Srikanth
01/18/2021, 11:04 PMFelix Vemmer
01/18/2021, 11:57 PMUnexpected error: TypeError("__init__() got an unexpected keyword argument 'client_options'")
Traceback (most recent call last):
File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 891, in get_task_run_state
result = self.result.write(value, **formatting_kwargs)
File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 77, in write
self.gcs_bucket.blob(new.location).upload_from_string(binary_data)
File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/engine/results/gcs_result.py", line 41, in gcs_bucket
client = get_storage_client()
File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 53, in get_storage_client
return get_google_client(storage, credentials=credentials, project=project)
File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/prefect/utilities/gcp.py", line 31, in get_google_client
client = Client(project=project, credentials=credentials)
File "/Users/felixvemmer/.pyenv/versions/3.8.6/envs/automation_beast/lib/python3.8/site-packages/google/cloud/storage/client.py", line 122, in __init__
super(Client, self).__init__(
TypeError: __init__() got an unexpected keyword argument 'client_options'
I am running a task thats returning a pd.DataFrame
which I am trying to store into Google Cloud Storage:
pandas_serializer = PandasSerializer(
file_type='csv'
)
gcs_result = GCSResult(
bucket='tripliq-data-lake',
serializer=pandas_serializer,
location=f'linkedin_top_posts/{datetime.datetime.now().strftime("%Y%m%d-%H%M%S")}_linkedin_post_likes.csv'
)
like_linkedin_feed = LikeLinkedInFeed(
result=gcs_result
)
I am not understanding the source code too well, but I think it’s referring to this line in
site-packages/google/cloud/storage/client.py
def __init__(
self,
project=_marker,
credentials=None,
_http=None,
client_info=None,
client_options=None,
):
Any help is very much appreciated!Alex Rud
01/19/2021, 3:42 AMGreg Roche
01/19/2021, 2:45 PMLocalDaskExecutor
flow, using a LocalAgent running inside a Docker image? TypeError: start() missing 1 required positional argument: 'self'
Edit: solved, I wasn't initialising the LocalDaskExecutor.
flow.executor = LocalDaskExecutor # wrong
flow.executor = LocalDaskExecutor() # this works
Joël Luijmes
01/19/2021, 3:29 PMPedro Machado
01/19/2021, 4:07 PMsupervisord
a while ago and was getting a permission error when trying to access the docker engine. I never got it to work.
I'd like to run the Docker Agent inside of a container managed by docker compose.
I have a couple of questions about this set up:
1. I am setting the restart
policy to always
Would this be enough to restart the agent if it failed or in case of restart of the host?
2. What is the best way to give the agent access to the docker engine running on the host?
Thank you!SK
01/19/2021, 5:28 PMSK
01/19/2021, 5:28 PMBK Lau
01/19/2021, 5:44 PMNomad
as executor instead of Dask
?? I want to use Prefect on the Cloud to drive a Nomad
cluster installed on-premiseMatic Lubej
01/19/2021, 8:18 PMdask_cloudprovider
API. running tutorials and sample code from dask
this works great, but for prefect I have created a dedicated docker image which I provide to the cluster initializer. The cluster gets created, but as soon as the flow starts, after 10 s I get the following time-out error:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/prefect/engine/flow_runner.py", line 418, in get_flow_run_state
with self.check_for_cancellation(), executor.start():
File "/usr/local/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/usr/local/lib/python3.8/site-packages/prefect/executors/dask.py", line 203, in start
with Client(self.address, **self.client_kwargs) as client:
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 748, in __init__
self.start(timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 953, in start
sync(self.loop, self._start, **kwargs)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 340, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.8/site-packages/distributed/utils.py", line 324, in f
result[0] = yield future
File "/usr/local/lib/python3.8/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1043, in _start
await self._ensure_connected(timeout=timeout)
File "/usr/local/lib/python3.8/site-packages/distributed/client.py", line 1100, in _ensure_connected
comm = await connect(
File "/usr/local/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect
raise IOError(
OSError: Timed out trying to connect to <tcp://172.31.40.184:8786> after 10 s
[2021-01-19 20:14:19+0000] ERROR - prefect.Execute process | Unexpected error occured in FlowRunner: OSError('Timed out trying to connect to <tcp://172.31.40.184:8786> after 10 s')
Traceback (most recent call last):
File "s3_process_l2a_2019.py", line 114, in <module>
assert status.is_successful()
AssertionError
Any ideas what is going on? Is the dask scheduler having issues connecting to the workers? Or might it be something else?
Thanks!Billy McMonagle
01/19/2021, 9:29 PMChris Jordan
01/19/2021, 10:13 PMtmpfile
object between tasks, it'll work as expected, as in
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
logger = prefect.context.get("logger")
the_file = tempfile.NamedTemporaryFile()
for i in range(5000000):
the_file.write(b"lorem ipsum\n")
the_file.seek(0)
<http://logger.info|logger.info>("wrote the file")
return the_file
@task(name="reread that data")
def read_some_data(the_file, result=PrefectResult()):
logger = prefect.context.get("logger")
output = the_file.read()
<http://logger.info|logger.info>(f"read the file")
<http://logger.info|logger.info>(f"length of file is {len(output)}")
with Flow("save_retrieve_file_flow") as flow:
f = save_some_data()
g = read_some_data(f)
but passing the name of the file won't work - the file will not be found
@task(name="save records to a tmp file")
def save_some_data(result=PrefectResult()):
logger = prefect.context.get("logger")
the_file = tempfile.NamedTemporaryFile()
for i in range(500000):
the_file.write(b"lorem ipsum\n")
the_file.seek(0)
<http://logger.info|logger.info>(f"wrote the file to {the_file.name}")
return the_file.name
@task(name="reread that data")
def read_some_data(the_file_name, result=PrefectResult()):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>(f"trying to open {the_file_name}")
with open(the_file_name, 'r') as the_file:
output = the_file.read()
<http://logger.info|logger.info>(f"read the file")
<http://logger.info|logger.info>(f"length of file is {len(output)}")
with Flow("save_retrieve_file_flow2",
state_handlers=[cloud_only_slack_handler]
) as flow:
f = save_some_data()
g = read_some_data(f)
what's going on here? is the file system being reset between tasks? does prefect clean up temporary files in the flow if they're not in memory directly? something else?